{
  "cells": [
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "JAPoU8Sm5E6e"
      },
      "source": [
        "# Feature Store: Streaming ingestion SDK"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "24743cf4a1e1"
      },
      "source": [
        "**NOTE**: This notebook has been tested in the following environment:\n",
        "\n",
        "* Python version = 3.9"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "tvgnzT1CKxrO"
      },
      "source": [
        "## Overview\n",
        "\n",
        "This notebook demonstrates how to use Vertex AI Feature Store's streaming ingestion at the SDK layer.\n",
        "\n",
        "Learn more about [Vertex AI Feature Store](https://cloud.google.com/vertex-ai/docs/featurestore)."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "d975e698c9a4"
      },
      "source": [
        "### Objective\n",
        "\n",
        "In this tutorial, you learn how to ingest features from a `Pandas DataFrame` into your Vertex AI Feature Store using `write_feature_values` method from the Vertex AI SDK.\n",
        "\n",
        "This tutorial uses the following Google Cloud ML services and resources:\n",
        "\n",
        "- Vertex AI Feature Store\n",
        "\n",
        "\n",
        "The steps performed include:\n",
        "\n",
        "- Create `Feature Store`\n",
        "- Create new `Entity Type` for your `Feature Store`\n",
        "- Ingest feature values from `Pandas DataFrame` into `Feature Store`'s `Entity Types`."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "08d289fa873f"
      },
      "source": [
        "### Dataset\n",
        "\n",
        "The dataset used for this notebook is the penguins dataset from [BigQuery public datasets](https://cloud.google.com/bigquery/public-data). This dataset has the following features: `culmen_length_mm`, `culmen_depth_mm`, `flipper_length_mm`, `body_mass_g`, `species`, and `sex`."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "aed92deeb4a0"
      },
      "source": [
        "### Costs\n",
        "\n",
        "This tutorial uses billable components of Google Cloud:\n",
        "\n",
        "* Vertex AI\n",
        "\n",
        "Learn about [Vertex AI\n",
        "pricing](https://cloud.google.com/vertex-ai/pricing) and use the [Pricing\n",
        "Calculator](https://cloud.google.com/products/calculator/)\n",
        "to generate a cost estimate based on your projected usage.\n"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "i7EUnXsZhAGF"
      },
      "source": [
        "## Installation\n",
        "\n",
        "Install the following packages required to execute this notebook."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "2b4ef9b72d43"
      },
      "outputs": [],
      "source": [
        "# Install the packages\n",
        "! pip3 install --upgrade google-cloud-aiplatform\\\n",
        "                         google-cloud-bigquery\\\n",
        "                         numpy\\\n",
        "                         pandas\\\n",
        "                         db-dtypes\\\n",
        "                         pyarrow -q\\\n",
        "                         --user"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "**Note:** You can ignore the dependency and incompatibility errors."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "58707a750154"
      },
      "source": [
        "### Restart the kernel"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "f200f10a1da3"
      },
      "outputs": [],
      "source": [
        "# Automatically restart kernel after installs so that your environment can access the new packages\n",
        "import IPython\n",
        "\n",
        "app = IPython.Application.instance()\n",
        "app.kernel.do_shutdown(True)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "BF1j6f9HApxa"
      },
      "source": [
        "## Before you begin\n",
        "\n",
        "### Set up your Google Cloud project\n",
        "\n",
        "**The following steps are required, regardless of your notebook environment.**\n",
        "\n",
        "1. [Select or create a Google Cloud project](https://console.cloud.google.com/cloud-resource-manager). When you first create an account, you get a $300 free credit towards your compute/storage costs.\n",
        "\n",
        "2. [Make sure that billing is enabled for your project](https://cloud.google.com/billing/docs/how-to/modify-project).\n",
        "\n",
        "3. [Enable the Vertex AI API](https://console.cloud.google.com/flows/enableapi?apiid=aiplatform.googleapis.com).\n",
        "\n",
        "4. If you are running this notebook locally, you need to install the [Cloud SDK](https://cloud.google.com/sdk)."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "WReHDGG5g0XY"
      },
      "source": [
        "#### Set your project ID\n",
        "\n",
        "**If you don't know your project ID**, try the following:\n",
        "* Run `gcloud config list`.\n",
        "* Run `gcloud projects list`.\n",
        "* See the support page: [Locate the project ID](https://support.google.com/googleapi/answer/7014113)"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "oM1iC_MfAts1"
      },
      "outputs": [],
      "source": [
        "PROJECT_ID = \"[your-project-id]\"  # Replace with your project-id\n",
        "\n",
        "# Set the project id\n",
        "! gcloud config set project {PROJECT_ID}"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "region"
      },
      "source": [
        "#### Region\n",
        "\n",
        "You can also change the `REGION` variable used by Vertex AI. Learn more about [Vertex AI regions](https://cloud.google.com/vertex-ai/docs/general/locations)."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "kljmKgilI_de"
      },
      "outputs": [],
      "source": [
        "REGION = \"us-central1\"  # Replace with your region"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "EsCYkJ4IU-z4"
      },
      "source": [
        "### UUID\n",
        "\n",
        "If you are in a live tutorial session, you might be using a shared test account or project. To avoid name collisions between users on resources created, you create a uuid for each instance session, and append it onto the name of resources you create in this tutorial."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "4jWj2DSTU9my"
      },
      "outputs": [],
      "source": [
        "import random\n",
        "import string\n",
        "\n",
        "\n",
        "# Generate a uuid of a specifed length(default=8)\n",
        "def generate_uuid(length: int = 8) -> str:\n",
        "    return \"\".join(random.choices(string.ascii_lowercase + string.digits, k=length))\n",
        "\n",
        "\n",
        "UUID = generate_uuid()"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "960505627ddf"
      },
      "source": [
        "### Import libraries"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "PyQmSRbKA8r-"
      },
      "outputs": [],
      "source": [
        "import numpy as np\n",
        "import pandas as pd\n",
        "from google.cloud import aiplatform, bigquery"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "init_aip:mbsdk,all"
      },
      "source": [
        "### Initialize Vertex AI SDK for Python\n",
        "\n",
        "Initialize the Vertex AI SDK for Python for your project."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "0ep8KuQhI_df"
      },
      "outputs": [],
      "source": [
        "aiplatform.init(project=PROJECT_ID, location=REGION)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "k5XsEiAuEWUJ"
      },
      "source": [
        "## Download and prepare the data"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "rOd7Ixa1pqBY"
      },
      "outputs": [],
      "source": [
        "def download_bq_table(bq_table_uri: str) -> pd.DataFrame:\n",
        "    # Remove bq:// prefix if present\n",
        "    prefix = \"bq://\"\n",
        "    if bq_table_uri.startswith(prefix):\n",
        "        bq_table_uri = bq_table_uri[len(prefix) :]\n",
        "\n",
        "    table = bigquery.TableReference.from_string(bq_table_uri)\n",
        "\n",
        "    # Create a BigQuery client\n",
        "    bqclient = bigquery.Client(project=PROJECT_ID)\n",
        "\n",
        "    # Download the table rows\n",
        "    rows = bqclient.list_rows(\n",
        "        table,\n",
        "    )\n",
        "    return rows.to_dataframe()"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "SdX_m1Uppkfu"
      },
      "outputs": [],
      "source": [
        "BQ_SOURCE = \"bq://bigquery-public-data.ml_datasets.penguins\"\n",
        "\n",
        "# Download penguins BigQuery table\n",
        "penguins_df = download_bq_table(BQ_SOURCE)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "QuQe6mSbFbhm"
      },
      "source": [
        "### Prepare the data\n",
        "\n",
        "Feature values to be written to the Feature Store can take the form of a list of `WriteFeatureValuesPayload` objects, a Python `dict` of the form\n",
        "\n",
        "`{entity_id : {feature_id : feature_value}, ...},`\n",
        "\n",
        "or a pandas `Dataframe`, where the `index` column holds the unique entity ID strings and each remaining column represents a feature.  In this notebook, since you use a pandas `DataFrame` for ingesting features we convert the index column data type to `string` to be used as `Entity ID`."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "cljxzJ3bqDer"
      },
      "outputs": [],
      "source": [
        "# Prepare the data\n",
        "penguins_df.index = penguins_df.index.map(str)"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "GSxrSdSY2ovn"
      },
      "outputs": [],
      "source": [
        "# Remove null values\n",
        "NA_VALUES = [\"NA\", \".\"]\n",
        "penguins_df = penguins_df.replace(to_replace=NA_VALUES, value=np.nan).dropna()"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "vgn4oQmSqdKI"
      },
      "source": [
        "## Create Feature Store and define schemas\n",
        "\n",
        "Vertex AI Feature Store organizes resources hierarchically in the following order:\n",
        "\n",
        "`Featurestore -> EntityType -> Feature`\n",
        "\n",
        "You must create these resources before you can ingest data into Vertex AI Feature Store.\n",
        "\n",
        "Learn more about [Vertex AI Feature Store](https://cloud.google.com/vertex-ai/docs/featurestore)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "yaHwdbGjZWTq"
      },
      "source": [
        "### Create a Feature Store\n",
        "\n",
        "You create a Feature Store using `aiplatform.Featurestore.create` with the following parameters:\n",
        "\n",
        "* `featurestore_id (str)`: The ID to use for this Featurestore, which will become the final component of the Featurestore's resource name. The value must be unique within the project and location.\n",
        "* `online_store_fixed_node_count`: Configuration for online serving resources.\n",
        "* `project`: Project to create EntityType in. If not set, project set in `aiplatform.init` is used.\n",
        "* `location`: Location to create EntityType in. If not set, location set in `aiplatform.init` is used.\n",
        "* `sync`:  Whether to execute this creation synchronously."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "cImsONglqfxO"
      },
      "outputs": [],
      "source": [
        "FEATURESTORE_ID = f\"penguins_{UUID}\"\n",
        "\n",
        "penguins_feature_store = aiplatform.Featurestore.create(\n",
        "    featurestore_id=FEATURESTORE_ID,\n",
        "    online_store_fixed_node_count=1,\n",
        "    project=PROJECT_ID,\n",
        "    location=REGION,\n",
        "    sync=True,\n",
        ")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "UfXgSD1VdzKb"
      },
      "source": [
        "##### Verify that the Feature Store is created\n",
        "Check if the Feature Store was successfully created by running the following code block."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "oud1OdfQd52r"
      },
      "outputs": [],
      "source": [
        "fs = aiplatform.Featurestore(\n",
        "    featurestore_name=FEATURESTORE_ID,\n",
        "    project=PROJECT_ID,\n",
        "    location=REGION,\n",
        ")\n",
        "print(fs.gca_resource)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "ep74rSlJWF3c"
      },
      "source": [
        "### Create an EntityType\n",
        "\n",
        "An entity type is a collection of semantically related features. You define your own entity types, based on the concepts that are relevant to your use case. For example, a movie service might have the entity types `movie` and `user`, which group related features that correspond to movies or users.\n",
        "\n",
        "Here, you create an entity type entity type named `penguin_entity_type` using `create_entity_type` with the following parameters:\n",
        "* `entity_type_id (str)`: The ID to use for the EntityType, which will become the final component of the EntityType's resource name. The value must be unique within a Feature Store.\n",
        "* `description`: Description of the EntityType."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "zNzr-FlEr3tI"
      },
      "outputs": [],
      "source": [
        "ENTITY_TYPE_ID = f\"penguin_entity_type_{UUID}\"\n",
        "\n",
        "# Create penguin entity type\n",
        "penguins_entity_type = penguins_feature_store.create_entity_type(\n",
        "    entity_type_id=ENTITY_TYPE_ID,\n",
        "    description=\"Penguins entity type\",\n",
        ")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "CquSdTp7duVw"
      },
      "source": [
        "##### Verify that the EntityType is created\n",
        "Check if the Entity Type was successfully created by running the following code block."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "76ocr_hJsG-t"
      },
      "outputs": [],
      "source": [
        "entity_type = penguins_feature_store.get_entity_type(entity_type_id=ENTITY_TYPE_ID)\n",
        "\n",
        "print(entity_type.gca_resource)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "2vYV2UUFehwZ"
      },
      "source": [
        "### Create Features\n",
        "A feature is a measurable property or attribute of an entity type. For example, `penguin` entity type has features such as `flipper_length_mm`, and `body_mass_g`. Features can be created within each entity type.\n",
        "\n",
        "When you create a feature, you specify its value type such as `DOUBLE`, and `STRING`. This value determines what value types you can ingest for a particular feature.\n",
        "\n",
        "Learn more about [Feature Value Types](https://cloud.google.com/vertex-ai/docs/reference/rest/v1/projects.locations.featurestores.entityTypes.features)"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "WQ5EsPPbsSuE"
      },
      "outputs": [],
      "source": [
        "penguins_feature_configs = {\n",
        "    \"species\": {\n",
        "        \"value_type\": \"STRING\",\n",
        "    },\n",
        "    \"island\": {\n",
        "        \"value_type\": \"STRING\",\n",
        "    },\n",
        "    \"culmen_length_mm\": {\n",
        "        \"value_type\": \"DOUBLE\",\n",
        "    },\n",
        "    \"culmen_depth_mm\": {\n",
        "        \"value_type\": \"DOUBLE\",\n",
        "    },\n",
        "    \"flipper_length_mm\": {\n",
        "        \"value_type\": \"DOUBLE\",\n",
        "    },\n",
        "    \"body_mass_g\": {\"value_type\": \"DOUBLE\"},\n",
        "    \"sex\": {\"value_type\": \"STRING\"},\n",
        "}"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "AKRXJCPijM8w"
      },
      "source": [
        "You can create features either using `create_feature` or `batch_create_features`. Here, for convinience, you have added all feature configs in one variabel, so we use `batch_create_features`."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "tXOI1Onhs46x"
      },
      "outputs": [],
      "source": [
        "penguin_features = penguins_entity_type.batch_create_features(\n",
        "    feature_configs=penguins_feature_configs,\n",
        ")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "WBx26pZItUN4"
      },
      "source": [
        "### Write features to the Feature Store\n",
        "Use the `write_feature_values` API to write a feature to the Feature Store with the following parameter:\n",
        "\n",
        "* `instances`: Feature values to be written to the Feature Store that can take the form of a list of WriteFeatureValuesPayload objects, a Python dict, or a pandas Dataframe.\n",
        "\n",
        "This streaming ingestion feature has been introduced to the Vertex AI SDK under the **preview** namespace. Here, you pass the pandas `Dataframe` you created from penguins dataset as `instances` parameter.\n",
        "\n",
        "Learn more about [Streaming ingestion API](https://github.com/googleapis/python-aiplatform/blob/e6933503d2d3a0f8a8f7ef8c178ed50a69ac2268/google/cloud/aiplatform/preview/featurestore/entity_type.py#L36)"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "iUGI-ftltXqE"
      },
      "outputs": [],
      "source": [
        "penguins_entity_type.preview.write_feature_values(instances=penguins_df)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "STq67KHO3q_e"
      },
      "source": [
        "## Read back written features\n",
        "\n",
        "Wait a few seconds for the write to propagate, then do an online read to confirm the write was successful."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "lwoMnze43r9G"
      },
      "outputs": [],
      "source": [
        "ENTITY_IDS = [str(x) for x in range(100)]\n",
        "penguins_entity_type.read(entity_ids=ENTITY_IDS)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "Note: The newly created feature store will be only be visible in the Console when \"Vertex AI Feature Store (Legacy)\" is selected from the top right menu. "
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "TpV-iwP9qw9c"
      },
      "source": [
        "## Cleaning up\n",
        "\n",
        "To clean up all Google Cloud resources used in this project, you can [delete the Google Cloud\n",
        "project](https://cloud.google.com/resource-manager/docs/creating-managing-projects#shutting_down_projects) you used for the tutorial.\n",
        "\n",
        "Otherwise, you can delete the individual resources you created in this tutorial:"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "sx_vKniMq9ZX"
      },
      "outputs": [],
      "source": [
        "penguins_feature_store.delete(force=True)"
      ]
    }
  ],
  "metadata": {
    "colab": {
      "name": "feature_store_streaming_ingestion_sdk.ipynb",
      "toc_visible": true
    },
    "kernelspec": {
      "display_name": "Python 3",
      "name": "python3"
    }
  },
  "nbformat": 4,
  "nbformat_minor": 0
}
